import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.call;
import java.util.Arrays;

import static org.apache.flink.table.api.Expressions.$;

public class Aggregate {

    public static void main(String[] args) throws Exception
    {


        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);


        DataSet<Order> ds1 = env.fromCollection(Arrays.asList(
                new Order(1L, "beer", 3),
                new Order(3L, "rubber", 2),
                new Order(1L, "diaper", 4)
        ));
        AggregateFunction<Row, MyMinMaxAcc> myAggFunc = new MyMinMax();
        tEnv.registerFunction("myAggFunc", myAggFunc);
        Table input = tEnv.fromDataSet(ds1,$("user"),$("product"),$("amount"));


        Table table = input
            .groupBy($("user"))
            .aggregate(call("myAggFunc", $("amount")).as("min", "max"))
            .select($("user"), $("min"), $("max"));

        tEnv.toDataSet(table,Row.class).print();



    }

}
